﻿/**
* CRL
*/
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using CRL.Core;
using CRL.Data;
using CRL.Sqlite;
using CRL.Data.DBAccess;
using CRL.EventBus;
using CRL.EventBus.NetCore;
using CRL.EventBus.Queue;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Reflection.PortableExecutable;
using System.Text;
using System.Threading.Tasks;
using CRL.EventBus.RocketMQ;
namespace MqTest
{
    //使用core注入实现
    class Program
    {
        static IServiceProvider provider;
        const string sqlLiteDb = "Data Source=d:\\sqlliteTest.db;";
        static Program()
        {
            var builder = new ConfigurationBuilder();

            var configuration = builder.Build();

            var services = new ServiceCollection();
            services.AddOptions();
            var builder2 = DBConfigRegister.GetInstance();
            builder2.UseSqlite();
            services.AddEventBus(config =>
            {
                //重试超时删除回调，可以自己处理
                config.RegisterOnEventDataRemove(data =>
                {
                    Console.WriteLine($"{data.RoutingKey} {data.Id} removed");
                    //throw new Exception("ddd");
                });
                //config.UseMemory();
                config.UseRabbitMQ(cfg =>
                {
                    cfg.HostName = "127.0.0.1";
                    cfg.UserName = "guest";
                    cfg.Password = "guest";
                }, opt =>
                {
                    opt.ConsumerChannelFunc = channel => channel.BasicQos(0, 1, false);
                });

                //可以使用多种配置，订阅时需要指定
                //config.UseRedis(cfg =>
                //{
                //    cfg.ConnString = "Server_204@127.0.0.1:6389";
                //    cfg.UseList = true;
                //});
                //config.UseDb(cfg =>
                //{
                //    cfg.DBType = DBType.SQLITE;
                //    cfg.ConnString = sqlLiteDb;
                //    cfg.ConsumerByNameSingle = true;
                //});
                //config.UseRocketMQ(b =>
                //{
                //    b.Server = "127.0.0.1:8080";
                //    b.AccessKey = "rocketmq2";
                //    b.SecretKey = "12345678";
                //    b.Topic = "eventBusTopic";
                //    b.GroupName = "group1";
                //    b.ConsumerByNameSingle = true;
                //});
                config.RegisterSubscribe(Assembly.GetAssembly(typeof(Program)));
            }
            );
           
            provider = services.BuildServiceProvider();
        }
        static SubscribeService subService;
        static void Main(string[] args)
        { 
            var client = provider.GetService<IPublisher>();
            subService = provider.GetService<SubscribeService>();

            //自定义订阅
            subService.AddSubscribe<MQType>(new SubscribeAttribute { Name = "testWithMqType", MQType = MQType.Db }, b =>
             {
                 //指定MQType接收
                 Console.WriteLine($"testWithMqType recieve {b}");
                 return true;
             });
            subService.AddSubscribe<DateTime>(new SubscribeAttribute { Name = "testDelay", DelayQueue = true }, b =>
            {
                //延迟
                var ts = DateTime.Now - b;
                Console.WriteLine($"testDelay ts {ts}");
                return true;
            });
    
            subService.AddSubscribeAsync<int>(new SubscribeAttribute { Name = "testRetry", RetryTimes = 3, RetryPolicy = GetDelayTime }, async b =>
            {
                await Task.Delay(10);
                Console.WriteLine($"{DateTime.Now.ToString("yy-MM-dd HH:mm:ss fffff")} Exception throw {b}");
                //异常时会重试
                throw new Exception($"Retry recieve {b}");
            });
            subService.AddSubscribe<int>(new SubscribeAttribute { Name = "testRetry2", RetryTimes = 2 }, b =>
            {
                //返回false时会重试
                Console.WriteLine($"testRetry2 recieve {b}");
                return false;
            });
            //subService.AddSubscribe<DateTime>(new SubscribeAttribute { Name = "testSendReq" }, b =>
            //{
            //    //Console.WriteLine($"testSendReq recieve {b}");
            //    return $"reqResponse1 {DateTime.Now}";
            //});
            subService.AddSubscribeAsync<DateTime>(new SubscribeAttribute { Name = "testSendReq" }, async b =>
            {
                await Task.Delay(10);
                //Console.WriteLine($"testSendReq recieve {b}");
                return $"reqResponse1 {DateTime.Now}";
            });
            subService.AddSubscribe<batchObj>(new SubscribeAttribute { Name = "testBatch", MQType = MQType.Db, ThreadSleepSecond = 10 }, b =>
            {
                Console.WriteLine($"args is {b}");
                return true;
            });
            provider.RunEventBusSubscribe();
            ConsoleTest.DoCommand(typeof(Program));
        }
        #region inner
        static int GetDelayTime(IData d)
        {
            //1天后不再重发
            if ((DateTime.Now - d.Time).TotalDays > 1)
            {
                return -1;
            }
            var delayMs = 500;
            //每10次重新开始
            var k = d.RetryTimes;
            if (k > 0)
            {
                k = k % 10;
            }
            for (var i = 0; i <= k; i++)
            {
                delayMs += (int)(delayMs * 0.5);
            }
            return delayMs;
        }
        #endregion
        public static void test()
        {
            var client = provider.GetService<IPublisher>();
            client.Publish("test", DateTime.Now.Second);
        }
        public static void testAsync()
        {
            var client = provider.GetService<IPublisher>();
            client.Publish("testAsync", DateTime.Now);
        }
        public static void testPriority()
        {
            var client = provider.GetService<IPublisher>();
            for (int i = 0; i < 50; i++)
            {
                var priority = i % 2 == 0 ? 0 : 10;
                client.Publish("test", priority, opt => { opt.Priority = priority; });
            }
        }
        public static void testDelayPublish()
        {
            var client = provider.GetService<IPublisher>();
            client.Publish("testDelay", DateTime.Now, opt => { opt.DelayMs = 3000; });//延迟队列发送
        }
        static void testWithMqType()
        {
            var factory = provider.GetService<PublisherFactory>();//按类型获取发布器
            var client2 = factory.GetPublisher(MQType.Db);
            client2.Publish("testWithMqType", $"mqType is {MQType.Db}");
        }
        public static void testRetry()
        {
            var client = provider.GetService<IPublisher>();
            client.Publish("testRetry", DateTime.Now.Second);
        }
        public static void testRetry2()
        {
            var client = provider.GetService<IPublisher>();
            client.Publish("testRetry2", DateTime.Now.Second);
        }
        public static void testSendArray()
        {
            var client = provider.GetService<IPublisher>();
            client.BatchPublish("test", new List<int> { 1, 2, 3 });//消费时兼容集合
        }
        public static void testAsyncThrowException()
        {
            var client = provider.GetService<IPublisher>();
            client.Publish("testAsyncThrowException", DateTime.Now);
        }
        public static void testSendReq()
        {
            var client = provider.GetService<IPublisher>();
            var res = client.Request<string>("testSendReq", DateTime.Now,5000);
            Console.WriteLine($"testSendReq response {res}");
        }
        public static void testSendReqError()
        {
            var client = provider.GetService<IPublisher>();
            var res = client.Request<string>("testSendReq3", DateTime.Now.ToString());
            Console.WriteLine($"testSendReq3 response {res}");
        }
        public class batchObj
        {
            public string name { get; set; }
            public string value { get; set; }
        }
        public static void testSendBatch()
        {
            var factory = provider.GetService<PublisherFactory>();//按类型获取发布器
            var client2 = factory.GetPublisher(MQType.Db);
            var list = new List<batchObj>();
            list.Add(new batchObj { name="3",value="222" });
            list.Add(new batchObj { name = "4", value = "222" });
            client2.BatchPublish("testBatch", list);
        }
        public static void testDeleteMsg()
        {
            var client = provider.GetService<IPublisher>();
            var msgKey = "testDelete_1";
            client.Publish("testDelete", "1111", opt =>
            {
                opt.MsgKey = msgKey;
            });
            var a = client.Queue.DeleteMsg(msgKey);
            Console.WriteLine($"delete {a}");
        }
        public static void testConsumeMsg()
        {
            var client = provider.GetService<IPublisher>();
            var msgKey = "testConsumeMsg_1";
            client.Publish("test", 1111, opt =>
            {
                opt.MsgKey = msgKey;
            });
            var a = client.Queue.ConsumeMsg(msgKey);
            Console.WriteLine($"ConsumeMsg {a}");
        }
        public static void testConsumeMsg2()
        {
            var client = provider.GetService<IPublisher>();
            client.Publish("testConsumeMsg2", 1111);
            client.Publish("testConsumeMsg2", 1111);
            client.Publish("testConsumeMsg2", 1111);

            client.Queue.ConsumeMsg<int>("testConsumeMsg2", 100, datas =>
            {
                Console.WriteLine($"ConsumeMsg {datas.Count}");
            });
        }
    }
}
